package org.pentaho.di.trans.steps.abnormalcheck;

import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.util.Utils;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.*;
import org.pentaho.di.trans.util.DataFileCacher;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * @author dc
 * @Type AbnormalCheck.java
 * @Desc
 * @date 2022/10/19 16:25
 */
public class AbnormalCheck extends BaseStep implements StepInterface {
    private static Class<?> PKG = AbnormalCheck.class; // for i18n purposes, needed by Translator2!! $NON-NLS-1$

    private static final String right = "1";
    private static final String wrong = "0";

    private ArrayList<Number>[] dataCache;
    private AbnormalCheckMeta meta;
    private AbnormalCheckData data;
    List<Object[]> allColumnData;
    private DataFileCacher dataFileCacher;

    public AbnormalCheck(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta, Trans trans) {
        super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
    }

    /**
     * 【1】获取配置文件中配置文件，采用中位数、平均数、固定值来判断异常字段，以及输出状态判断字段
     * 【2】processRow获取上面返回的所有的数据，缓存起来
     * 【3】将当前行的标识字段输出到下一个节点的inputMeta
     * 【4】根据相应的策略判断输出状态
     */
    @Override
    public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
        meta = (AbnormalCheckMeta) smi;
        data = (AbnormalCheckData) sdi;
        Object[] r = getRow();
        if (r == null) {
            if (allColumnData.size() > 0) {
                dataFileCacher.serializObjectToFile(allColumnData);
            }
            dataFileCacher.close(false);
            allColumnData = null;
            List<Object[]> outputRows = generateOutputRows(dataFileCacher.deserializeFileToObject());
            for (Object[] outputRow : outputRows) {
                putRow(data.getOutputRowMeta(), outputRow);
            }
            setOutputDone();
            dataCache = null;
            return false;
        }
        if (first) {
            String fileName = getTransMeta().getName() + System.currentTimeMillis();
            dataFileCacher = new DataFileCacher(fileName);
            allColumnData = new ArrayList<>();
            first = false;
            data.setInputRowMeta(getInputRowMeta());
            RowMetaInterface outPutRowMeta = data.getInputRowMeta().clone();
            data.setOutputRowMeta(outPutRowMeta);
            meta.getFields(data.getOutputRowMeta(), getStepname(), null, null, this, repository, metaStore);
            dataCache = new ArrayList[meta.getNumFieldsToProcess()];
            data.setFunctions(new AbnormalCheckFunction[meta.getNumFieldsToProcess()]);
            for (int i = 0; i < meta.getNumFieldsToProcess(); i++) {
                AbnormalCheckConfig config = meta.getConfigs()[i];
                data.getFunctions()[i] = new AbnormalCheckFunction();
                if (!Utils.isEmpty(config.getSourceFieldName())) {
                    //输出值
                    int fieldIndex = data.getInputRowMeta().indexOfValue(config.getSourceFieldName());
                    if (fieldIndex < 0) {
                        throw new KettleStepException("Unable to find the specified fieldname '"
                                + config.getSourceFieldName() + "' for stats calc #" + (i + 1));
                    }
                    AbnormalCheckFunction function = data.getFunctions()[i];
                    function.m_columnIndex = fieldIndex;
                    ValueMetaInterface inputFieldMeta = data.getInputRowMeta().getValueMeta(fieldIndex);
                    if (!inputFieldMeta.isNumeric() && !inputFieldMeta.isString()) {
                        throw new KettleException("The input field for stats calc #" + (i + 1) + "is not numeric.");
                    }
                    if (config.isMedianFlag()) {
                        dataCache[i] = new ArrayList<Number>();
                    }
                } else {
                    throw new KettleException("There is no input field specified for stats calc #" + (i + 1));
                }
            }
        }
        /*****缓存全部数据到文件*******/
        if (allColumnData.size() < 5000) {
            allColumnData.add(r);
        } else {
            dataFileCacher.serializObjectToFile(allColumnData);
            allColumnData.clear();
            allColumnData.add(r);
        }
        for (int i = 0; i < meta.getNumFieldsToProcess(); i++) {
            AbnormalCheckConfig config = meta.getConfigs()[i];
            if (!Utils.isEmpty(config.getSourceFieldName())) {
                AbnormalCheckFunction function = data.getFunctions()[i];
                ValueMetaInterface metaI = getInputRowMeta().getValueMeta(function.m_columnIndex);
                Number input = null;
                try {
                    input = metaI.getNumber(r[function.m_columnIndex]);
                } catch (Exception ex) {
                    // quietly ignore -- assume missing for anything not
                    // parsable as a number
                }
                if (input != null) {
                    // add to the cache?
                    if (config.isMedianFlag()) {
                        dataCache[i].add(input);
                    }
                    // update stats
                    double val = input.doubleValue();
                    function.m_count++;
                    function.m_sum += val;
                } // otherwise, treat non-numeric values as missing
            }
        }
        if (log.isRowLevel()) {
            logRowlevel("Read row #" + getLinesRead() + " : " + Arrays.toString(r));
        }
        if (checkFeedback(getLinesRead())) {
            logBasic("Linenr " + getLinesRead());
        }
        return true;
    }


    /**
     * Generates an output row
     *
     * @return an <code>Object[]</code> value
     */
    private List<Object[]> generateOutputRows(List<Object[]> allColumnData) {
        RowMetaInterface inputRowMeta = getInputRowMeta();
        int valueSize = inputRowMeta.getValueMetaList().size();
        for (int i = 0; i < meta.getNumFieldsToProcess(); i++) {
            AbnormalCheckConfig config = meta.getConfigs()[i];
            if (!Utils.isEmpty(config.getSourceFieldName())) {
                //计算相应的数据
                AbnormalCheckFunction function = data.getFunctions()[i];
                double calVal = function.generateOutputValues(config, dataCache[i]);
                judgeBool(config, function, calVal, allColumnData, i, valueSize);
            }
        }
        return allColumnData;
    }

    /**
     * @param config
     * @param calVal
     * @param allColumnData
     * @return
     */
    private void judgeBool(AbnormalCheckConfig config, AbnormalCheckFunction function, double calVal, List<Object[]> allColumnData, int isFirstColumn
            , int valueSize) {
        String compareSymbols = config.getCompareSymbols();
        switch (compareSymbols) {
            case "=":
                getEQRet(function, calVal, allColumnData, isFirstColumn, valueSize);
                break;
            case ">":
                getMoreRet(function, calVal, allColumnData, isFirstColumn, valueSize);
                break;
            case "<":
                getLessRet(function, calVal, allColumnData, isFirstColumn, valueSize);
                break;
            case ">=":
                getMoreEQRet(function, calVal, allColumnData, isFirstColumn, valueSize);
                break;
            case "<=":
                getLessEQRet(function, calVal, allColumnData, isFirstColumn, valueSize);
                break;
            default:
                this.log.logError(BaseMessages.getString(PKG, "AbnormalCheckFunction.compareSymbols not found :{}") + compareSymbols);
                break;
        }
    }

    private void getEQRet(AbnormalCheckFunction function, double calVal, List<Object[]> allColumnData, int isFirstColumn, int valueSize) {
        String temp = null;
        for (int i = 0; i < allColumnData.size(); i++) {
            Object[] columnData = allColumnData.get(i);
            int length = columnData.length;
            if (isFirstColumn > 0) {
                if (wrong.equals(columnData[length - 1].toString())) {
                    continue;
                }
            }
            double val = Double.valueOf(columnData[function.m_columnIndex].toString());
            if (val == calVal) {
                temp = right;
            } else {
                temp = wrong;
            }
            if (columnData.length == valueSize) {
                Object[] objects = new Object[valueSize + 1];
                System.arraycopy(columnData, 0, objects, 0, length);
                objects[length] = temp;
                allColumnData.set(i, objects);
            } else {
                columnData[valueSize] = temp;
                allColumnData.set(i, columnData);
            }
        }
    }

    private void getMoreRet(AbnormalCheckFunction function, double calVal, List<Object[]> allColumnData, int isFirstColumn,
                            int valueSize) {
        String temp = null;
        for (int i = 0; i < allColumnData.size(); i++) {
            Object[] columnData = allColumnData.get(i);
            int length = columnData.length;
            if (isFirstColumn > 0) {
                if (wrong.equals(columnData[length - 1].toString())) {
                    continue;
                }
            }
            double val = Double.valueOf(columnData[function.m_columnIndex].toString());
            if (val > calVal) {
                temp = right;
            } else {
                temp = wrong;
            }
            if (columnData.length == valueSize) {
                Object[] objects = new Object[valueSize + 1];
                System.arraycopy(columnData, 0, objects, 0, length);
                objects[length] = temp;
                allColumnData.set(i, objects);
            } else {
                columnData[valueSize] = temp;
                allColumnData.set(i, columnData);
            }
        }
    }


    private void getMoreEQRet(AbnormalCheckFunction function, double calVal, List<Object[]> allColumnData, int isFirstColumn, int valueSize) {
        String temp = null;
        for (int i = 0; i < allColumnData.size(); i++) {
            Object[] columnData = allColumnData.get(i);
            int length = columnData.length;
            if (isFirstColumn > 0) {
                if (wrong.equals(columnData[length - 1].toString())) {
                    continue;
                }
            }
            double val = Double.valueOf(columnData[function.m_columnIndex].toString());
            if (val >= calVal) {
                temp = right;
            } else {
                temp = wrong;
            }
            if (columnData.length == valueSize) {
                Object[] objects = new Object[valueSize + 1];
                System.arraycopy(columnData, 0, objects, 0, length);
                objects[length] = temp;
                allColumnData.set(i, objects);
            } else {
                columnData[valueSize] = temp;
                allColumnData.set(i, columnData);
            }
        }
    }

    private void getLessRet(AbnormalCheckFunction function, double calVal, List<Object[]> allColumnData, int isFirstColumn
            , int valueSize) {
        String temp = null;
        for (int i = 0; i < allColumnData.size(); i++) {
            Object[] columnData = allColumnData.get(i);
            int length = columnData.length;
            if (isFirstColumn > 0) {
                if (wrong.equals(columnData[length - 1].toString())) {
                    continue;
                }
            }
            double val = Double.valueOf(columnData[function.m_columnIndex].toString());
            if (val < calVal) {
                temp = right;
            } else {
                temp = wrong;
            }
            if (columnData.length == valueSize) {
                Object[] objects = new Object[valueSize + 1];
                System.arraycopy(columnData, 0, objects, 0, length);
                objects[length] = temp;
                allColumnData.set(i, objects);
            } else {
                columnData[valueSize] = temp;
                allColumnData.set(i, columnData);
            }
        }
    }


    private void getLessEQRet(AbnormalCheckFunction function, double calVal, List<Object[]> allColumnData, int isFirstColumn,
                              int valueSize) {
        String temp = null;
        for (int i = 0; i < allColumnData.size(); i++) {
            Object[] columnData = allColumnData.get(i);
            int length = columnData.length;
            if (isFirstColumn > 0) {
                if (wrong.equals(columnData[length - 1].toString())) {
                    continue;
                }
            }
            double val = Double.valueOf(columnData[function.m_columnIndex].toString());
            if (val <= calVal) {
                temp = right;
            } else {
                temp = wrong;
            }
            if (columnData.length == valueSize) {
                Object[] objects = new Object[valueSize + 1];
                System.arraycopy(columnData, 0, objects, 0, length);
                objects[length] = temp;
                allColumnData.set(i, objects);
            } else {
                columnData[valueSize] = temp;
                allColumnData.set(i, columnData);
            }
        }
    }

    @Override
    public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
        meta = (AbnormalCheckMeta) smi;
        data = (AbnormalCheckData) sdi;

        if (super.init(smi, sdi)) {
            return true;
        }
        return false;
    }

}
    
